package com.rtsapp.server.network.protocol.rpc.client.impl;

import com.rtsapp.server.logger.Logger;
import com.rtsapp.server.network.protocol.ProtocolConstants;
import com.rtsapp.server.network.protocol.rpc.client.RPCCallback;
import com.rtsapp.server.network.protocol.rpc.codec.RPCContext;
import com.rtsapp.server.network.protocol.rpc.codec.RPCRequest;
import com.rtsapp.server.network.protocol.rpc.codec.RPCResponse;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;


/**
 * RPCFuture RPC异步Future
 * 用于同步调用或者异步回调
 * @author admin
 */
public class RPCFuture implements Future<Object>{

	private static final Logger logger =com.rtsapp.server.logger.LoggerFactory.getLogger( RPCFuture.class );
	
	private Sync sync;
	private RPCContext rpcCtx ;
	
	private ReentrantLock lock = new ReentrantLock( );
	private List<RPCCallback> pendingCallbacks = new ArrayList<RPCCallback>();
	
	private RPCClientInHandler handler;
	private long startTime;
	private long responseTimeThreshold = 300;
	
	
	static class Sync extends AbstractQueuedSynchronizer{
		
		private static final long serialVersionUID = 1L;

		private final int done = 1;
		private final int pending = 0;
		
		@Override
		protected boolean tryAcquire( int acquires ) {
			return this.getState() == done? true : false;
		}

		@Override
		protected boolean tryRelease(int arg) {
			if( getState() == pending ){
				if( compareAndSetState( pending, 1 ) ){
					return true;
				}
			}
			return false;
		}
		
		public boolean isDone(){
			getState( );
			return getState() == done;
		}
		
	}
	
	public RPCFuture( RPCContext rpcCtx, RPCClientInHandler  handler ){
		this.sync = new Sync();
		this.rpcCtx = rpcCtx;
		this.handler = handler;
		this.startTime = System.currentTimeMillis();
	}
	
	
	/**
	 * 添加回调的
	 * @param callback
	 */
	public RPCFuture addCallback( RPCCallback callback ){
		
		lock.lock();
		try{
			if( isDone() ){
				runCallback( callback );
			}else{
				this.pendingCallbacks.add( callback );
			}
		}finally{
			lock.unlock();
		}
		return this;
	}
	
	
	private void runCallback(RPCCallback callback) {
		RPCResponse response = rpcCtx.getResponse();
		
		byte status =  response.getErrorNo();
		if( status == ProtocolConstants.RPCConstants.RPCStatus.ok  ){
			callback.onCompleted( response.getResult( ) );
		}else if( status == ProtocolConstants.RPCConstants.RPCStatus.exception ){

			String message = "Got exception in server|objName="+rpcCtx.getRequest().getObjectName()+"|methodName="+rpcCtx.getRequest().getMethodName()+"|server msg="+response.getResult( );
			if( response.getResult() != null && response.getResult() instanceof Throwable ) {
				Throwable cause = ( Throwable ) response.getResult( );
				callback.onError( new RuntimeException( message, cause ) );
			}else{
				callback.onError( new RuntimeException( message) );
			}


		}else if( status == ProtocolConstants.RPCConstants.RPCStatus.unknownError ){
			callback.onError(  new RuntimeException("Got unknown error in server|objName="+rpcCtx.getRequest().getObjectName()+"|methodName="+rpcCtx.getRequest().getMethodName()+"|server msg="+response.getResult( ) ) );
		}
		
	}

	private void invokeCallbacks(){

		lock.lock();
		try{
			for( final RPCCallback callback : pendingCallbacks ){
				runCallback( callback );
			}
		}catch( Throwable ex ){
			logger.error( "runCallback error", ex );
		}finally{
			lock.unlock();
		}
		
		
	}

	public void done( RPCResponse response ){
		
		this.rpcCtx.setResponse( response );
		byte type = rpcCtx.getRequest().getRpcType();
		if( type == ProtocolConstants.RPCConstants.RPCType.normal ){
			sync.release( 1 );
		}else if( type == ProtocolConstants.RPCConstants.RPCType.async ){
			sync.release( 1 );
			invokeCallbacks( );
		}else if( type == ProtocolConstants.RPCConstants.RPCType.oneway ){
			
		}
		
		
		long responseTime = System.currentTimeMillis() - startTime;
		RPCRequest request = this.rpcCtx.getRequest();
		if(responseTime > this.responseTimeThreshold){
		    logger.warn("Service response time is too slow|serviceName="+ request.getObjectName()+"|funcName="+ request.getMethodName()+"|responseTime=" + responseTime );
		}
	}



	@Override
	public boolean cancel(boolean mayInterruptIfRunning) {
		throw new UnsupportedOperationException();
	}


	@Override
	public boolean isCancelled() {
		throw new UnsupportedOperationException();
	}


	@Override
	public boolean isDone() {
		return sync.isDone();
	}


	@Override
	public Object get() throws InterruptedException, ExecutionException {
		sync.acquire( -1 );
		return processResponse( );
	}


	@Override
	public Object get(long timeout, TimeUnit unit) throws InterruptedException,
			ExecutionException, TimeoutException {
		
		boolean success = sync.tryAcquireNanos( -1, unit.toNanos( timeout ) );
		if( success ){
			return processResponse( );
		}else{
			throw new RuntimeException("Timeout exception|objName="+rpcCtx.getRequest().getObjectName()+"|methodName="+rpcCtx.getRequest().getMethodName() );
		}
	}
	
	
	private Object processResponse( ){
		
		byte type = rpcCtx.getRequest().getRpcType();
		
		if( type == ProtocolConstants.RPCConstants.RPCType.normal || type == ProtocolConstants.RPCConstants.RPCType.async ){
			RPCResponse response = rpcCtx.getResponse();
			byte status = response.getErrorNo();
			if( status == ProtocolConstants.RPCConstants.RPCStatus.exception ){
				throw new RuntimeException("Got exception in server|objName="+rpcCtx.getRequest().getObjectName()+"|methodName="+rpcCtx.getRequest().getMethodName()+"|server msg="+response.getResult( ) );
			}else if( status == ProtocolConstants.RPCConstants.RPCStatus.unknownError ){
				throw new RuntimeException("Got unknown error in server|objName="+rpcCtx.getRequest().getObjectName()+"|methodName="+rpcCtx.getRequest().getMethodName()+"|server msg="+response.getResult( ) );
			}
		}		
		
		return rpcCtx.getResponse().getResult( );
		
	}
	
	
}
